package com.shujia.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo4KafkaSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> studentDS = env.readTextFile("data/student_json");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("students")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据处理的语义
                .build();

        //使用kafka sink
        studentDS.sinkTo(sink);

        env.execute();

    }
}
